NIO 您所在的位置:网站首页 java selector底层epoll NIO

NIO

2024-07-17 22:19| 来源: 网络整理| 查看: 265

目录NIO-EPollSelectorIpml源码分析目录前言初始化EPollSelectorProvider创建EPollSelectorImplEPollSelectorImpl结构fdToKey管道文件描述符EPollArrayWrapper创建EPoll文件描述符初始化epoll_event数组注册doSelect关闭EpollSelectorImpl总结相关文献

NIO-EPollSelectorIpml源码分析 目录

NIO-概览 NIO-Buffer NIO-Channel NIO-Channel接口分析 NIO-SocketChannel源码分析 NIO-FileChannel源码分析 NIO-Selector源码分析 NIO-WindowsSelectorImpl源码分析 NIO-EPollSelectorIpml源码分析

前言

本来是想学习Netty的,但是Netty是一个NIO框架,因此在学习netty之前,还是先梳理一下NIO的知识。通过剖析源码理解NIO的设计原理。

本系列文章针对的是JDK1.8.0.161的源码。

NIO-Selector源码分析对Selector的功能和创建过程进行了分析,本篇对Linux环境下JDK实现的EPollSelectorImpl源码进行详细讲解。

本篇文章不会对EPoll算法进行详细介绍,对epoll算法感兴趣或还不了解的同学可以看epoll原理详解及epoll反应堆模型先进行学习。

在详细介绍EpollSelectorProvider之前我们先了解一下EPoll主要的三个步骤:

调用epoll_create建立一个epoll 对象(在epoll文件系统中给这个句柄分配资源); 调用epoll_ctl向epoll对象中添加或删除文件句柄及监控事件。 调用epoll_wait收集发生事件的文件描述符。 初始化EPollSelectorProvider

NIO-Selector源码分析提到,若没有进行配置时,默认通过sun.nio.ch.DefaultSelectorProvider.create()创建SelectorProvider。Linux下的代码路径在jdk\src\solaris\classes\sun\nio\ch\DefaultSelectorProvider.java。在其内部通过实际是创建了一个EPollSelectorProvider。

创建EPollSelectorImpl

EPollSelectorProvider是用于创建EPollSelectorImpl的。

Selector.Open()-> SelectorProvider.provider()-> sun.nio.ch.DefaultSelectorProvider.create()-> EPollSelectorProvider.openSelector()-> new EPollSelectorImpl(this) public class EPollSelectorProvider extends SelectorProviderImpl { public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this); } public Channel inheritedChannel() throws IOException { return InheritedChannel.getChannel(); } }

inheritedChannel()可以返回系统默认SelectorProvider创建的通道,主要有些操作系统底层需要调用默认的通道。

EPollSelectorImpl结构

在详细讲解EPollSelectorImpl源码之前,先了解EPollSelectorImpl的主要的数据结构和属性。

名称 作用 Map fdToKey 保存文件描述符句柄和的SelectionKey的映射关系 int fd0 管道的读端文件描述符 int fd1 管道的写端文件描述符 EPollArrayWrapper pollWrapper 调用底层Epoll算法的包装类 EPollSelectorImpl(SelectorProvider sp) throws IOException { super(sp); long pipeFds = IOUtil.makePipe(false); fd0 = (int) (pipeFds >>> 32); //无符号移位 fd1 = (int) pipeFds; pollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); fdToKey = new HashMap(); } void initInterrupt(int fd0, int fd1) { outgoingInterruptFD = fd1; incomingInterruptFD = fd0; //将管道的读取端注册 epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN); }

pipeFds高32位存放的是通道read端的文件描述符FD,低32位存放的是write端的文件描述符。这里做移位处理。

通过调用JNI的makePipe方法创建单向管道。

JNIEXPORT jlong JNICALL Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking) { int fd[2]; if (pipe(fd) < 0) { JNU_ThrowIOExceptionWithLastError(env, "Pipe failed"); return 0; } if (blocking == JNI_FALSE) { //配置阻塞 if ((configureBlocking(fd[0], JNI_FALSE) < 0) || (configureBlocking(fd[1], JNI_FALSE) < 0)) { JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed"); close(fd[0]); close(fd[1]); return 0; } } //高32位存读端,低32位存写端 return ((jlong) fd[0] 64k if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE) eventsHigh = new HashMap(); } //最大不超过64K private static final int MAX_UPDATE_ARRAY_SIZE = AccessController.doPrivileged( new GetIntegerAction("sun.nio.ch.maxUpdateArraySize", Math.min(OPEN_MAX, 64*1024)));

EPollArrayWrapper内部会为维护两个结构,当句柄值小于MAX_UPDATE_ARRAY_SIZE时会保存到数组结构中。否则会存储到Map中。主要是优化效率。

private static final int MAX_UPDATE_ARRAY_SIZE = AccessController.doPrivileged( new GetIntegerAction("sun.nio.ch.maxUpdateArraySize", Math.min(OPEN_MAX, 64*1024))); 通过epollCreate方法创建epoll文件描述符,JNI调用底层的epoll_create方法。传入的参数位最大注册的socket fd数量。 JNIEXPORT jint JNICALL Java_sun_nio_ch_EPoll_epollCreate(JNIEnv *env, jclass c) { /* * epoll_create expects a size as a hint to the kernel about how to * dimension internal structures. We can't predict the size in advance. */ int epfd = epoll_create(256); if (epfd < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed"); } return epfd; }

epoll_create用于创建EPoll事件所需的内存空间,默认为256,在Linux 2.6.8以后,传入的size就没用了,底层会动态调整所需数据结构的大小。详情可以看下epoll_create的方法描述

初始化epoll_event数组

epfd创建完后,创建epoll_event的数组,首先查询epoll_event结构的大小

private static final int SIZE_EPOLLEVENT = sizeofEPollEvent(); Java_sun_nio_ch_EPollArrayWrapper_sizeofEPollEvent(JNIEnv* env, jclass this) { return sizeof(struct epoll_event); }

查询配置的文件描述符最大数量

private static final int OPEN_MAX = IOUtil.fdLimit(); private static final int NUM_EPOLLEVENTS = Math.min(OPEN_MAX, 8192); Java_sun_nio_ch_IOUtil_fdLimit(JNIEnv *env, jclass this) { struct rlimit rlp; if (getrlimit(RLIMIT_NOFILE, &rlp) < 0) { JNU_ThrowIOExceptionWithLastError(env, "getrlimit failed"); return -1; } if (rlp.rlim_max < 0 || rlp.rlim_max > java_lang_Integer_MAX_VALUE) { return java_lang_Integer_MAX_VALUE; } else { return (jint)rlp.rlim_max; } }

getrlimit用于获取资源使用限制,RLIMIT_NOFILE获取最大文件打开数量。对于getrlimit详细介绍可以看一下 Linux系统调用--getrlimit()与setrlimit()函数详解

根据查询到的epoll_event结构大小和数量初始化数组大小。

int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray = new AllocatedNativeObject(allocationSize, true);

在 EPollArrayWrapper 内部使用 AllocatedNativeObject对象创建的堆外(native)内存对象。 将数组的首地址保存到pollArrayAddress中,在调用epollWait的时候需要传递该参数给JNI。

和Windows的PollArrayWrapper一样,EPollArrayWrapper也暴露了读写FD和Event的方法供EPollSelectorImpl使用。

void putEventOps(int i, int event) { int offset = SIZE_EPOLLEVENT * i + EVENT_OFFSET; pollArray.putInt(offset, event); } void putDescriptor(int i, int fd) { int offset = SIZE_EPOLLEVENT * i + FD_OFFSET; pollArray.putInt(offset, fd); } int getEventOps(int i) { int offset = SIZE_EPOLLEVENT * i + EVENT_OFFSET; return pollArray.getInt(offset); } int getDescriptor(int i) { int offset = SIZE_EPOLLEVENT * i + FD_OFFSET; return pollArray.getInt(offset); } 注册 protected void implRegister(SelectionKeyImpl ski) { if (closed) throw new ClosedSelectorException(); SelChImpl ch = ski.channel; //获取通道的句柄 int fd = Integer.valueOf(ch.getFDVal()); //加入到缓存中 fdToKey.put(fd, ski); //加入到数组缓存 pollWrapper.add(fd); keys.add(ski); } 在注册的时候会将SelectionKey加入到fdToKey和keys,同时会将文件描述符加入到pollWrapper pollWrapper.add(fd); void add(int fd) { // force the initial update events to 0 as it may be KILLED by a // previous registration. synchronized (updateLock) { assert !registered.get(fd); //初始化事件掩码为0 setUpdateEvents(fd, (byte)0, true); } } private void setUpdateEvents(int fd, byte events, boolean force) { //小于MAX_UPDATE_ARRAY_SIZE存到数组中 if (fd < MAX_UPDATE_ARRAY_SIZE) { if ((eventsLow[fd] != KILLED) || force) { eventsLow[fd] = events; } } else { //大于MAX_UPDATE_ARRAY_SIZE存到map中 Integer key = Integer.valueOf(fd); if (!isEventsHighKilled(key) || force) { eventsHigh.put(key, Byte.valueOf(events)); } } } private boolean isEventsHighKilled(Integer key) { assert key >= MAX_UPDATE_ARRAY_SIZE; Byte value = eventsHigh.get(key); return (value != null && value == KILLED); }

若文件描述符的值为KILLED(-1)时,该管道被释放。不再加入。如上面所述,这里会根据key的大小存放到mapeventsHigh或字节数组eventsLow中。

在调用poll的时候才会调用epollCtl进行注册。

int poll(long timeout) throws IOException { //更新epoll事件,实际调用`epollCtl`加入到epollfd中 updateRegistrations(); ... } private void updateRegistrations() { synchronized (updateLock) { int j = 0; while (j < updateCount) { int fd = updateDescriptors[j]; short events = getUpdateEvents(fd); boolean isRegistered = registered.get(fd); int opcode = 0; if (events != KILLED) { //已经注册过 if (isRegistered) { //修改或删除 opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; } else { //新增 opcode = (events != 0) ? EPOLL_CTL_ADD : 0; } if (opcode != 0) { epollCtl(epfd, opcode, fd, events); if (opcode == EPOLL_CTL_ADD) { //增加到registered缓存是否已注册 registered.set(fd); } else if (opcode == EPOLL_CTL_DEL) { registered.clear(fd); } } } j++; } updateCount = 0; } } private byte getUpdateEvents(int fd) { if (fd < MAX_UPDATE_ARRAY_SIZE) { return eventsLow[fd]; } else { Byte result = eventsHigh.get(Integer.valueOf(fd)); // result should never be null return result.byteValue(); } } doSelect protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); //1. 删除取消的key processDeregisterQueue(); try { begin(); //2. 获取就绪文件描述符 pollWrapper.poll(timeout); } finally { end(); } //3. 再次删除取消的key processDeregisterQueue(); //4. 将就绪的key加入到selectedKeys中 int numKeysUpdated = updateSelectedKeys(); //5. 若管道被唤醒清理唤醒的数据 if (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; } 删除取消的key,当channel关闭时,对应的Key会被取消,被取消的key会加入到cancelledKeys中。调用processDeregisterQueue遍历所有的key进行卸载。 processDeregisterQueue(); //遍历所有已取消的key,取消他们 void processDeregisterQueue() throws IOException { // Precondition: Synchronized on this, keys, and selectedKeys Set cks = cancelledKeys(); //遍历每个key调用卸载 implDereg(ski); } protected void implDereg(SelectionKeyImpl ski) throws IOException { assert (ski.getIndex() >= 0); SelChImpl ch = ski.channel; int fd = ch.getFDVal(); //根据文件句柄值移除 fdToKey.remove(Integer.valueOf(fd)); //从堆外内存溢出epoll_event结构 pollWrapper.remove(fd); ski.setIndex(-1); keys.remove(ski); selectedKeys.remove(ski); //将key设置为无效 deregister((AbstractSelectionKey)ski); SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); }

从pollWrapper移除,会将句柄值设置为KILLED(-1)

pollWrapper.remove(fd); void remove(int fd) { synchronized (updateLock) { //设置实现值为-1 取消 setUpdateEvents(fd, KILLED, false); // remove from epoll if (registered.get(fd)) { //从epool对象中删除 epollCtl(epfd, EPOLL_CTL_DEL, fd, 0); registered.clear(fd); } } }

EPOLL_CTL_DEL操作符将文件描述符从epoll fd中移除。

获取就绪文件描述符

通过调用epollWait方法,获取到已就绪的文件描述符,存放在pollArrayAddress地址中。

pollWrapper.poll(timeout); int poll(long timeout) throws IOException { //更新epoll事件,实际调用`epollCtl`加入到epollfd中 updateRegistrations(); //获取已就绪的文件句柄 updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); //如是唤醒文件句柄,则跳过,设置interrupted=true for (int i=0; i


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有